package com.huan.filter.aggreagte;

import com.huan.filter.vo.Person;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * reduce reduce可以对已有的数据进行归约处理，把每一个新输入的数据和当前已经归约出来的值，再做一个聚合计算
 *
 * @author huan.fu
 * @date 2023/9/18 - 23:02
 */
public class ReduceApplication {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        environment.fromElements(
                        new Person(1, "张三", 20),
                        new Person(1, "李四", 20),
                        new Person(2, "王五", 25),
                        new Person(2, "赵六", 25),
                        new Person(5, "田七", 30)
                )
                .keyBy(Person::getId)
                .reduce(new ReduceFunction<Person>() {
                    /**
                     * reduce 规约操作： reduce可以对根据keyBy分组后的数据进行归约处理，把每一个新输入的数据和当前已经归约出来的值，再做一个聚合计算
                     */
                    @Override
                    public Person reduce(Person person, Person person2) throws Exception {
                        return new Person(person.getId(), person.getName(), person.getAge() + person2.getAge());
                    }
                })
                .print();

        environment.execute("reduce operation");
    }
}
